{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "Windowing -- Tour of Beam",
      "provenance": [],
      "collapsed_sections": [],
      "toc_visible": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "metadata": {
        "cellView": "form",
        "id": "upmJn_DjcThx"
      },
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5UC_aGanx6oE"
      },
      "source": [
        "# Windowing -- _Tour of Beam_\n",
        "\n",
        "Sometimes, we want to [aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation) data, like `GroupByKey` or `Combine`, only at certain intervals, like hourly or daily, instead of processing the entire `PCollection` of data only once.\n",
        "\n",
        "We might want to emit a [moving average](https://en.wikipedia.org/wiki/Moving_average) as we're processing data.\n",
        "\n",
        "Maybe we want to analyze the user experience for a certain task in a web app, it would be nice to get the app events by sessions of activity.\n",
        "\n",
        "Or we could be running a streaming pipeline, and there is no end to the data, so how can we aggregate data?\n",
        "\n",
        "_Windows_ in Beam allow us to process only certain data intervals at a time.\n",
        "In this notebook, we go through different ways of windowing our pipeline.\n",
        "\n",
        "Lets begin by installing `apache-beam`."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "R_Yhoc6N_Flg"
      },
      "source": [
        "# Install apache-beam with pip.\n",
        "!pip install --quiet apache-beam"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_OkWHiAvpWDZ"
      },
      "source": [
        "First, lets define some helper functions to simplify the rest of the examples.\n",
        "\n",
        "We have a transform to help us analyze an element alongside its window information, and we have another transform to help us analyze how many elements landed into each window.\n",
        "We use a custom [`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n",
        "to access that information.\n",
        "\n",
        "You don't need to understand these, you just need to know they exist 🙂."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "C9yAN1Hgk3dF"
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "def human_readable_window(window) -> str:\n",
        "  \"\"\"Formats a window object into a human readable string.\"\"\"\n",
        "  if isinstance(window, beam.window.GlobalWindow):\n",
        "    return str(window)\n",
        "  return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'\n",
        "\n",
        "class PrintElementInfo(beam.DoFn):\n",
        "  \"\"\"Prints an element with its Window information.\"\"\"\n",
        "  def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):\n",
        "    print(f'[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}')\n",
        "    yield element\n",
        "\n",
        "@beam.ptransform_fn\n",
        "def PrintWindowInfo(pcollection):\n",
        "  \"\"\"Prints the Window information with how many elements landed in that window.\"\"\"\n",
        "  class PrintCountsInfo(beam.DoFn):\n",
        "    def process(self, num_elements, window=beam.DoFn.WindowParam):\n",
        "      print(f'>> Window [{human_readable_window(window)}] has {num_elements} elements')\n",
        "      yield num_elements\n",
        "\n",
        "  return (\n",
        "      pcollection\n",
        "      | 'Count elements per window' >> beam.combiners.Count.Globally().without_defaults()\n",
        "      | 'Print counts info' >> beam.ParDo(PrintCountsInfo())\n",
        "  )"
      ],
      "execution_count": 1,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "CQrojV2QnqIU"
      },
      "source": [
        "Now lets create some data to use in the examples.\n",
        "\n",
        "Windows define data intervals based on time, so we need to tell Apache Beam a timestamp for each element.\n",
        "\n",
        "We define a `PTransform` for convenience, so we can attach the timestamps automatically.\n",
        "\n",
        "Apache Beam requires us to provide the timestamp as [Unix time](https://en.wikipedia.org/wiki/Unix_time), which is a way to represent a date and time as the number of seconds since January 1st, 1970.\n",
        "\n",
        "For our data, lets analyze some events about the seasons and moon phases for the year 2021, which might be [useful for a gardening project](https://www.almanac.com/content/planting-by-the-moon).\n",
        "\n",
        "To attach timestamps to each element, we can `Map` each element and return a [`TimestmpedValue`](https://beam.apache.org/documentation/transforms/python/elementwise/withtimestamps/)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "Sgzscopvmh1f",
        "outputId": "e0c6fc19-ab97-4754-8f1f-1601807be940"
      },
      "source": [
        "import time\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "\n",
        "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:\n",
        "  \"\"\"Converts a time string into Unix time.\"\"\"\n",
        "  time_tuple = time.strptime(time_str, time_format)\n",
        "  return int(time.mktime(time_tuple))\n",
        "\n",
        "@beam.ptransform_fn\n",
        "@beam.typehints.with_input_types(beam.pvalue.PBegin)\n",
        "@beam.typehints.with_output_types(beam.window.TimestampedValue)\n",
        "def AstronomicalEvents(pipeline):\n",
        "  return (\n",
        "      pipeline\n",
        "      | 'Create data' >> beam.Create([\n",
        "          ('2021-03-20 03:37:00', 'March Equinox 2021'),\n",
        "          ('2021-04-26 22:31:00', 'Super full moon'),\n",
        "          ('2021-05-11 13:59:00', 'Micro new moon'),\n",
        "          ('2021-05-26 06:13:00', 'Super full moon, total lunar eclipse'),\n",
        "          ('2021-06-20 22:32:00', 'June Solstice 2021'),\n",
        "          ('2021-08-22 07:01:00', 'Blue moon'),\n",
        "          ('2021-09-22 14:21:00', 'September Equinox 2021'),\n",
        "          ('2021-11-04 15:14:00', 'Super new moon'),\n",
        "          ('2021-11-19 02:57:00', 'Micro full moon, partial lunar eclipse'),\n",
        "          ('2021-12-04 01:43:00', 'Super new moon'),\n",
        "          ('2021-12-18 10:35:00', 'Micro full moon'),\n",
        "          ('2021-12-21 09:59:00', 'December Solstice 2021'),\n",
        "      ])\n",
        "      | 'With timestamps' >> beam.MapTuple(\n",
        "          lambda timestamp, element:\n",
        "              beam.window.TimestampedValue(element, to_unix_time(timestamp))\n",
        "      )\n",
        "  )\n",
        "\n",
        "# Lets see how the data looks like.\n",
        "beam_options = PipelineOptions(flags=[], type_check_additional='all')\n",
        "with beam.Pipeline(options=beam_options) as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Astronomical events' >> AstronomicalEvents()\n",
        "      | 'Print element' >> beam.Map(print)\n",
        "  )"
      ],
      "execution_count": 3,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "March Equinox 2021\n",
            "Super full moon\n",
            "Micro new moon\n",
            "Super full moon, total lunar eclipse\n",
            "June Solstice 2021\n",
            "Blue moon\n",
            "September Equinox 2021\n",
            "December Solstice 2021\n",
            "Super new moon\n",
            "Micro full moon, partial lunar eclipse\n",
            "Super new moon\n",
            "Micro full moon\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qI0K3jSA2LbJ"
      },
      "source": [
        "> ℹ️ After running this, it looks like the timestamps disappeared!\n",
        "> They're actually still _implicitly_ part of the element, just like the windowing information.\n",
        "> If we need to access it, we can do so via a custom [`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo).\n",
        "> Aggregation transforms use each element's timestamp along with the windowing we specified to create windows of elements."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ymHF1WCqnG4V"
      },
      "source": [
        "# Global window\n",
        "\n",
        "All pipelines use the [`GlobalWindow`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.GlobalWindow) by default.\n",
        "This is a single window that covers the entire `PCollection`.\n",
        "\n",
        "In many cases, especially for batch pipelines, this is what we want since we want to analyze all the data that we have.\n",
        "\n",
        "> ℹ️ `GlobalWindow` is not very useful in a streaming pipeline unless you only need element-wise transforms.\n",
        "> Aggregations, like `GroupByKey` and `Combine`, need to process the entire window, but a streaming pipeline has no end, so they would never finish."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "xDXdE9uysriw",
        "outputId": "b39e7fe7-dc13-4d77-89af-f2d1312ab673"
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "# All elements fall into the GlobalWindow by default.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Astrolonomical events' >> AstronomicalEvents()\n",
        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
        "      | 'Print window info' >> PrintWindowInfo()\n",
        "  )"
      ],
      "execution_count": 4,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "[GlobalWindow] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[GlobalWindow] 2021-04-26 22:31:00 -- Super full moon\n",
            "[GlobalWindow] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[GlobalWindow] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[GlobalWindow] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[GlobalWindow] 2021-08-22 07:01:00 -- Blue moon\n",
            "[GlobalWindow] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[GlobalWindow] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[GlobalWindow] 2021-11-04 15:14:00 -- Super new moon\n",
            "[GlobalWindow] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[GlobalWindow] 2021-12-04 01:43:00 -- Super new moon\n",
            "[GlobalWindow] 2021-12-18 10:35:00 -- Micro full moon\n",
            ">> Window [GlobalWindow] has 12 elements\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "l3Kod_pR7a7S"
      },
      "source": [
        "![Global window]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7WkYLzFCo4Rl"
      },
      "source": [
        "# Fixed time windows\n",
        "\n",
        "If we want to analyze our data hourly, daily, monthly, etc. We might want to create evenly spaced intervals.\n",
        "\n",
        "[`FixedWindows`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.FixedWindows)\n",
        "allow us to create fixed-sized windows.\n",
        "We only need to specify the _window size_ in seconds.\n",
        "\n",
        "In Python, we can use [`timedelta`](https://docs.python.org/3/library/datetime.html#timedelta-objects)\n",
        "to help us do the conversion of minutes, hours, or days for us.\n",
        "\n",
        "> ℹ️ Some time deltas like a month cannot be so easily converted into seconds, since a month can have from 28 to 31 days.\n",
        "> Sometimes using an estimate like 30 days in a month is enough.\n",
        "\n",
        "We must use the [`WindowInto`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html?highlight=windowinto#apache_beam.transforms.core.WindowInto)\n",
        "transform to apply the kind of window we want."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "amZCkPNZ5gFQ",
        "outputId": "8e6c0a13-3f19-4452-ce4e-08b74b24798e"
      },
      "source": [
        "import apache_beam as beam\n",
        "from datetime import timedelta\n",
        "\n",
        "# Fixed-sized windows of approximately 3 months.\n",
        "window_size = timedelta(days=3*30).total_seconds()  # in seconds\n",
        "print(f'window_size: {window_size} seconds')\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  elements = (\n",
        "      pipeline\n",
        "      | 'Astronomical events' >> AstronomicalEvents()\n",
        "      | 'Fixed windows' >> beam.WindowInto(beam.window.FixedWindows(window_size))\n",
        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
        "      | 'Print window info' >> PrintWindowInfo()\n",
        "  )"
      ],
      "execution_count": 7,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "window_size: 7776000.0 seconds\n",
            "[2021-01-03 00:00:00 - 2021-04-03 00:00:00] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-04-26 22:31:00 -- Super full moon\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-08-22 07:01:00 -- Blue moon\n",
            "[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-04 15:14:00 -- Super new moon\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-04 01:43:00 -- Super new moon\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-18 10:35:00 -- Micro full moon\n",
            ">> Window [2021-01-03 00:00:00 - 2021-04-03 00:00:00] has 1 elements\n",
            ">> Window [2021-04-03 00:00:00 - 2021-07-02 00:00:00] has 4 elements\n",
            ">> Window [2021-07-02 00:00:00 - 2021-09-30 00:00:00] has 2 elements\n",
            ">> Window [2021-09-30 00:00:00 - 2021-12-29 00:00:00] has 5 elements\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4ijww4Vq7jO7"
      },
      "source": [
        "![Fixed windows]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1_Vdnmazo5ot"
      },
      "source": [
        "# Sliding time windows\n",
        "\n",
        "Maybe we want a fixed-sized window, but we don't want to wait until a window finishes so we can start the new one.\n",
        "We might want to calculate a moving average.\n",
        "\n",
        "For example, lets say we want to analyze our data for the last three months, but we want to have a monthly report.\n",
        "In other words, we want windows at a monthly frequency, but each window should cover the last three months.\n",
        "\n",
        "[`Sliding windows`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.SlidingWindows)\n",
        "allow us to do just that.\n",
        "We need to specify the _window size_ in seconds just like with `FixedWindows`. We also need to specify a _window period_ in seconds, which is how often we want to emit each window."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "ue19z9wGg2-f",
        "outputId": "1f99937f-ead9-485f-84b9-8a1da7ae8f1f"
      },
      "source": [
        "import apache_beam as beam\n",
        "from datetime import timedelta\n",
        "\n",
        "# Sliding windows of approximately 3 months every month.\n",
        "window_size = timedelta(days=3*30).total_seconds()  # in seconds\n",
        "window_period = timedelta(days=30).total_seconds()  # in seconds\n",
        "print(f'window_size:   {window_size} seconds')\n",
        "print(f'window_period: {window_period} seconds')\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Astronomical events' >> AstronomicalEvents()\n",
        "      | 'Sliding windows' >> beam.WindowInto(\n",
        "          beam.window.SlidingWindows(window_size, window_period)\n",
        "      )\n",
        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
        "      | 'Print window info' >> PrintWindowInfo()\n",
        "  )"
      ],
      "execution_count": 12,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "window_size:   7776000.0 seconds\n",
            "window_period: 2592000.0 seconds\n",
            "[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[2021-02-02 00:00:00 - 2021-05-03 00:00:00] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[2021-01-03 00:00:00 - 2021-04-03 00:00:00] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-04-26 22:31:00 -- Super full moon\n",
            "[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-04-26 22:31:00 -- Super full moon\n",
            "[2021-02-02 00:00:00 - 2021-05-03 00:00:00] 2021-04-26 22:31:00 -- Super full moon\n",
            "[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[2021-06-02 00:00:00 - 2021-08-31 00:00:00] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[2021-08-01 00:00:00 - 2021-10-30 00:00:00] 2021-08-22 07:01:00 -- Blue moon\n",
            "[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-08-22 07:01:00 -- Blue moon\n",
            "[2021-06-02 00:00:00 - 2021-08-31 00:00:00] 2021-08-22 07:01:00 -- Blue moon\n",
            "[2021-08-31 00:00:00 - 2021-11-29 00:00:00] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[2021-08-01 00:00:00 - 2021-10-30 00:00:00] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[2021-11-29 00:00:00 - 2022-02-27 00:00:00] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[2021-10-30 00:00:00 - 2022-01-28 00:00:00] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[2021-10-30 00:00:00 - 2022-01-28 00:00:00] 2021-11-04 15:14:00 -- Super new moon\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-04 15:14:00 -- Super new moon\n",
            "[2021-08-31 00:00:00 - 2021-11-29 00:00:00] 2021-11-04 15:14:00 -- Super new moon\n",
            "[2021-10-30 00:00:00 - 2022-01-28 00:00:00] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[2021-08-31 00:00:00 - 2021-11-29 00:00:00] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[2021-11-29 00:00:00 - 2022-02-27 00:00:00] 2021-12-04 01:43:00 -- Super new moon\n",
            "[2021-10-30 00:00:00 - 2022-01-28 00:00:00] 2021-12-04 01:43:00 -- Super new moon\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-04 01:43:00 -- Super new moon\n",
            "[2021-11-29 00:00:00 - 2022-02-27 00:00:00] 2021-12-18 10:35:00 -- Micro full moon\n",
            "[2021-10-30 00:00:00 - 2022-01-28 00:00:00] 2021-12-18 10:35:00 -- Micro full moon\n",
            "[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-18 10:35:00 -- Micro full moon\n",
            ">> Window [2021-03-04 00:00:00 - 2021-06-02 00:00:00] has 4 elements\n",
            ">> Window [2021-02-02 00:00:00 - 2021-05-03 00:00:00] has 2 elements\n",
            ">> Window [2021-01-03 00:00:00 - 2021-04-03 00:00:00] has 1 elements\n",
            ">> Window [2021-04-03 00:00:00 - 2021-07-02 00:00:00] has 4 elements\n",
            ">> Window [2021-05-03 00:00:00 - 2021-08-01 00:00:00] has 3 elements\n",
            ">> Window [2021-06-02 00:00:00 - 2021-08-31 00:00:00] has 2 elements\n",
            ">> Window [2021-08-01 00:00:00 - 2021-10-30 00:00:00] has 2 elements\n",
            ">> Window [2021-07-02 00:00:00 - 2021-09-30 00:00:00] has 2 elements\n",
            ">> Window [2021-08-31 00:00:00 - 2021-11-29 00:00:00] has 3 elements\n",
            ">> Window [2021-11-29 00:00:00 - 2022-02-27 00:00:00] has 3 elements\n",
            ">> Window [2021-10-30 00:00:00 - 2022-01-28 00:00:00] has 5 elements\n",
            ">> Window [2021-09-30 00:00:00 - 2021-12-29 00:00:00] has 5 elements\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "zl4CEcL-Xsxc"
      },
      "source": [
        "![Sliding windows]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_8BpHRGmBYon"
      },
      "source": [
        "A thing to note with `SlidingWindows` is that one element might be processed multiple times because it might overlap in more than one window.\n",
        "\n",
        "In our example, the _\"processing\"_ is done by `PrintElementInfo` which simply prints the element with its window information. For windows of three months every month, each element is processed three times, one time per window.\n",
        "\n",
        "In many cases, if we're just doing simple element-wise operations, this isn't generally an issue.\n",
        "But for more resource-intensive transformations, it might be a good idea to perform those transformations _before_ doing the windowing."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "DpFxVHD9CZ-7",
        "outputId": "7bfe9cb4-f88e-4d41-e75b-59637ba72534"
      },
      "source": [
        "import apache_beam as beam\n",
        "from datetime import timedelta\n",
        "\n",
        "# Sliding windows of approximately 3 months every month.\n",
        "window_size = timedelta(days=3*30).total_seconds()  # in seconds\n",
        "window_period = timedelta(days=30).total_seconds()  # in seconds\n",
        "print(f'window_size:   {window_size} seconds')\n",
        "print(f'window_period: {window_period} seconds')\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Astronomical events' >> AstronomicalEvents()\n",
        "      #------\n",
        "      # ℹ️ Here we're processing / printing the data before windowing.\n",
        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
        "      | 'Sliding windows' >> beam.WindowInto(\n",
        "          beam.window.SlidingWindows(window_size, window_period)\n",
        "      )\n",
        "      #------\n",
        "      | 'Print window info' >> PrintWindowInfo()\n",
        "  )"
      ],
      "execution_count": 17,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "window_size:   7776000.0 seconds\n",
            "window_period: 2592000.0 seconds\n",
            "[GlobalWindow] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[GlobalWindow] 2021-04-26 22:31:00 -- Super full moon\n",
            "[GlobalWindow] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[GlobalWindow] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[GlobalWindow] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[GlobalWindow] 2021-08-22 07:01:00 -- Blue moon\n",
            "[GlobalWindow] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[GlobalWindow] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[GlobalWindow] 2021-11-04 15:14:00 -- Super new moon\n",
            "[GlobalWindow] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[GlobalWindow] 2021-12-04 01:43:00 -- Super new moon\n",
            "[GlobalWindow] 2021-12-18 10:35:00 -- Micro full moon\n",
            ">> Window [2021-03-04 00:00:00 - 2021-06-02 00:00:00] has 4 elements\n",
            ">> Window [2021-02-02 00:00:00 - 2021-05-03 00:00:00] has 2 elements\n",
            ">> Window [2021-01-03 00:00:00 - 2021-04-03 00:00:00] has 1 elements\n",
            ">> Window [2021-04-03 00:00:00 - 2021-07-02 00:00:00] has 4 elements\n",
            ">> Window [2021-05-03 00:00:00 - 2021-08-01 00:00:00] has 3 elements\n",
            ">> Window [2021-06-02 00:00:00 - 2021-08-31 00:00:00] has 2 elements\n",
            ">> Window [2021-08-01 00:00:00 - 2021-10-30 00:00:00] has 2 elements\n",
            ">> Window [2021-07-02 00:00:00 - 2021-09-30 00:00:00] has 2 elements\n",
            ">> Window [2021-08-31 00:00:00 - 2021-11-29 00:00:00] has 3 elements\n",
            ">> Window [2021-11-29 00:00:00 - 2022-02-27 00:00:00] has 3 elements\n",
            ">> Window [2021-10-30 00:00:00 - 2022-01-28 00:00:00] has 5 elements\n",
            ">> Window [2021-09-30 00:00:00 - 2021-12-29 00:00:00] has 5 elements\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Zx2k1uhyDKvY"
      },
      "source": [
        "Note that by doing the windowing _after_ the processing, we only process / print the elments once, but the windowing afterwards is the same."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-XaTV5D2pM5f"
      },
      "source": [
        "# Session windows\n",
        "\n",
        "Maybe we don't want regular windows, but instead, have the windows reflect periods where activity happened.\n",
        "\n",
        "[`Sessions`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.Sessions)\n",
        "allow us to create those kinds of windows.\n",
        "We now have to specify a _gap size_ in seconds, which is the maximum number of seconds of inactivity to close a session window.\n",
        "\n",
        "For example, if we specify a gap size of 30 days. The first event would open a new session window since there are no already opened windows. If the next event happens within the next 30 days or less, like 20 days after the previous event, the session window extends and covers that as well. If there are no new events for the next 30 days, the session window closes and is emitted."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "O41tAUwhjaLX",
        "outputId": "d9c0b886-1e47-459b-a64c-94e71fd6075e"
      },
      "source": [
        "import apache_beam as beam\n",
        "from datetime import timedelta\n",
        "\n",
        "# Sessions divided by approximately 1 month gaps.\n",
        "gap_size = timedelta(days=30).total_seconds()  # in seconds\n",
        "print(f'gap_size: {gap_size} seconds')\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Astronomical events' >> AstronomicalEvents()\n",
        "      | 'Session windows' >> beam.WindowInto(beam.window.Sessions(gap_size))\n",
        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
        "      | 'Print window info' >> PrintWindowInfo()\n",
        "  )"
      ],
      "execution_count": 19,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "gap_size: 2592000.0 seconds\n",
            "[2021-03-20 03:37:00 - 2021-04-19 03:37:00] 2021-03-20 03:37:00 -- March Equinox 2021\n",
            "[2021-04-26 22:31:00 - 2021-05-26 22:31:00] 2021-04-26 22:31:00 -- Super full moon\n",
            "[2021-05-11 13:59:00 - 2021-06-10 13:59:00] 2021-05-11 13:59:00 -- Micro new moon\n",
            "[2021-05-26 06:13:00 - 2021-06-25 06:13:00] 2021-05-26 06:13:00 -- Super full moon, total lunar eclipse\n",
            "[2021-06-20 22:32:00 - 2021-07-20 22:32:00] 2021-06-20 22:32:00 -- June Solstice 2021\n",
            "[2021-08-22 07:01:00 - 2021-09-21 07:01:00] 2021-08-22 07:01:00 -- Blue moon\n",
            "[2021-09-22 14:21:00 - 2021-10-22 14:21:00] 2021-09-22 14:21:00 -- September Equinox 2021\n",
            "[2021-12-21 09:59:00 - 2022-01-20 09:59:00] 2021-12-21 09:59:00 -- December Solstice 2021\n",
            "[2021-11-04 15:14:00 - 2021-12-04 15:14:00] 2021-11-04 15:14:00 -- Super new moon\n",
            "[2021-11-19 02:57:00 - 2021-12-19 02:57:00] 2021-11-19 02:57:00 -- Micro full moon, partial lunar eclipse\n",
            "[2021-12-04 01:43:00 - 2022-01-03 01:43:00] 2021-12-04 01:43:00 -- Super new moon\n",
            "[2021-12-18 10:35:00 - 2022-01-17 10:35:00] 2021-12-18 10:35:00 -- Micro full moon\n",
            ">> Window [2021-03-20 03:37:00 - 2021-04-19 03:37:00] has 1 elements\n",
            ">> Window [2021-04-26 22:31:00 - 2021-07-20 22:32:00] has 4 elements\n",
            ">> Window [2021-08-22 07:01:00 - 2021-09-21 07:01:00] has 1 elements\n",
            ">> Window [2021-09-22 14:21:00 - 2021-10-22 14:21:00] has 1 elements\n",
            ">> Window [2021-11-04 15:14:00 - 2022-01-20 09:59:00] has 5 elements\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "eJeoBghkJ_1O"
      },
      "source": [
        "![Sessions]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9YE0HIKAvXR3"
      },
      "source": [
        "# What's next?\n",
        "\n",
        "* [Windowing](https://beam.apache.org/documentation/programming-guide/#windowing) -- learn more about windowing in the Beam Programming Guide.\n",
        "* [Triggers](https://beam.apache.org/documentation/programming-guide/#triggers) -- learn about triggers in the Beam Programming Guide.\n",
        "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview) --\n",
        "  check out all the available transforms.\n",
        "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example) --\n",
        "  learn more about windowing, triggers, and streaming through a complete example pipeline.\n",
        "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix) --\n",
        "  check the available runners, their capabilities, and how to run your pipeline in them."
      ]
    }
  ]
}